<?php

namespace kafka;


/**
 * kafka配置
 * @link https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
 * @link https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/class.rdkafka.html
 *
 * @method set($name, $value)
 */
class Conf
{

    /** @var \RdKafka\Conf */
    private $instance;


    public function __construct(\RdKafka\Conf $conf)
    {
        $this->instance = $conf;
    }


    public static function newSimpleKafkaConf(string $brokers) : self
    {
        $conf = new \RdKafka\Conf();
        $conf->set('api.version.request', 'true');
        $conf->set('log_level', (string)LOG_INFO);
        $conf->set('metadata.broker.list', $brokers);

        return new self($conf);
    }


    public static function newSimpleSASLKafkaConf(string $brokers, ?string $username, ?string $password) : self
    {
        $conf = static::newSimpleKafkaConf($brokers);

        if ($username) {
            $conf->set('sasl.mechanisms', 'PLAIN');
            $conf->set('sasl.username', $username);
            $conf->set('sasl.password', $password);
            $conf->set('security.protocol', 'SASL_SSL');
            $conf->set('ssl.ca.location', __DIR__ . '/mix-4096-ca-cert');
        }

        return $conf;
    }


    public function getKafkaConfInstance() : \RdKafka\Conf
    {
        return $this->instance;
    }


    public function setKafkaConfInstance(\RdKafka\Conf $instance) : void
    {
        $this->instance = $instance;
    }


    /*
     * topic消息生产模式
     * 0：这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条（批）消息
     * 1：这意味着producer在leader已成功收到的数据并得到确认后发送下一条message
     * -1：这意味着producer在follower副本确认接收到数据后才算一次发送完成
     *
     * 三种机制，性能依次递减 (producer吞吐量降低)，数据健壮性则依次递增
     */
    public function setAckMode(int $ackMode)
    {
        $this->instance->set('acks', (string)$ackMode);
    }


    public function setLogLevel(int $logLevel = LOG_DEBUG)
    {
        $this->instance->set('log_level', (string)$logLevel);
    }


    public function setSendCallback(callable $errorCallback, callable $successCallback = null) : void
    {
        $this->instance->setDrMsgCb(function (\RdKafka $kafka, \RdKafka\Message $message) use ($successCallback, $errorCallback) {
            if ($message->err) {
                if (is_callable($errorCallback)) {
                    call_user_func($errorCallback, $kafka, $message);
                }
            } else {
                if (is_callable($successCallback)) {
                    call_user_func($successCallback, $kafka, $message);
                }
            }
        });
    }


    public function setErrorCallback(callable $errorCallback) : void
    {
        $this->instance->setErrorCb(function ($kafka, int $err, string $reason) use ($errorCallback) {
            if (is_callable($errorCallback)) {
                call_user_func($errorCallback, $kafka, $err, $reason, rd_kafka_err2str($err));
            }
        });
    }


    public function __set($name, $value) : void
    {
        $this->instance->set($name, $value);
    }

    public function __call($name, $args)
    {
        return $this->instance->$name(...$args);
    }

}